package pulsar

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"sync"
	"time"

	"gitee.com/xfrm/middleware/xconfig"
	"gitee.com/xfrm/middleware/xlog"
)

const instanceConfSep = "@"

var (
	// MQ角色格式不正确
	InvalidMqRoleTypeStringErr = errors.New("invalid mq role type string")
)

type Manager struct {
	center    xconfig.ConfigCenter
	instances sync.Map
	mutex     sync.Mutex
}

func NewManager(ctx context.Context, center xconfig.ConfigCenter) (*Manager, error) {
	err := center.SubscribeNamespaces(ctx, []string{mqNamespace})
	if err != nil {
		return nil, err
	}
	manager := &Manager{
		center: center,
	}
	manager.center.RegisterObserver(ctx, manager.observer())
	return manager, nil
}

func (p *Manager) newInstance(ctx context.Context, conf *instanceConf) (interface{}, error) {
	fun := "newInstance"
	switch conf.role {
	case producerRoleType:
		config, err := getProducerConfig(ctx, conf.topic, p.center)
		if err != nil {
			return nil, fmt.Errorf("%s get producer config error, err: %v", fun, err)
		}
		return NewProducerWithConfig(ctx, config)
	case consumerRoleType:
		config, err := getConsumerConfig(ctx, conf.topic, conf.group, p.center)
		if err != nil {
			return nil, fmt.Errorf("%s get consumer config error, err: %v", fun, err)
		}
		return NewConsumerWithConfig(ctx, config)
	default:
		return nil, fmt.Errorf("role %d error", conf.role)
	}
}

func (p *Manager) get(ctx context.Context, conf *instanceConf) (in interface{}, err error) {
	fun := "Manager.get -->"

	key := p.buildKey(conf)
	in, ok := p.instances.Load(key)
	if ok == false {

		p.mutex.Lock()
		defer p.mutex.Unlock()

		in, ok = p.instances.Load(key)
		if ok {
			return
		}

		xlog.Infof(ctx, "%s newInstance, key: %s", fun, key)

		retryTimes := 0
		for {
			retryTimes += 1
			sleepTime, err := p.getSleepTime(retryTimes)
			if err != nil {
				return nil, err
			}
			in, err = p.newInstance(ctx, conf)
			if err != nil {
				xlog.Errorf(ctx, "%s newInstance, key: %s, error:%v, sleepTime: %d", fun, key, err, sleepTime)
				time.Sleep(time.Duration(sleepTime) * time.Second)
				continue
			}
			in, _ = p.instances.LoadOrStore(key, in)
			break
		}
	}
	return
}

func (p *Manager) getProducer(ctx context.Context, conf *instanceConf) (*Producer, error) {
	fun := "Manager.getProducer -->"

	in, err := p.get(ctx, conf)
	if err != nil {
		return nil, err
	}

	producer, ok := in.(*Producer)
	if ok == false {
		return nil, fmt.Errorf("%s in.(Producer) err, topic: %s", fun, conf.topic)
	}

	return producer, err
}

func (p *Manager) getConsumer(ctx context.Context, conf *instanceConf) (*Consumer, error) {
	fun := "Manager.getConsumer -->"

	in, err := p.get(ctx, conf)
	if err != nil {
		return nil, err
	}

	consumer, ok := in.(*Consumer)
	if ok == false {
		return nil, fmt.Errorf("%s in.(Consumer) err, topic: %s", fun, conf.topic)
	}

	return consumer, err
}

func (p *Manager) observer() *xconfig.ConfigObserver {
	return xconfig.NewConfigObserver(func(ctx context.Context, event *xconfig.ChangeEvent) {
		// 只关心mq namespace下的变更
		if event.Namespace != mqNamespace {
			return
		}
		for key, change := range event.Changes {
			p.applyChange(ctx, key, change)
		}
	})
}

func (p *Manager) applyChange(ctx context.Context, key string, change *xconfig.Change) {
	fun := "Manager.applyChange -->"
	xlog.Infof(ctx, " %s change:%v to key:%s", fun, change, key)

	p.instances.Range(func(instanceKey, value interface{}) (ret bool) {
		ret = true

		sk, ok := instanceKey.(string)
		if !ok {
			xlog.Errorf(ctx, "%s key:%v should be string", fun, key)
			return
		}

		instanceConf, err := p.confFromKey(sk)
		if err != nil {
			xlog.Errorf(ctx, "%s failed to convert key:%s to conf", fun, sk)
			return
		}
		apolloMQKey, err := parseTopicConfigKey(key)
		if err != nil {
			xlog.Errorf(ctx, "%s failed parse apollo mq config key: %s", fun, key)
			return
		}

		// 同一泳道，同一topic，同一角色 认为是一次变更
		if instanceConf.topic == apolloMQKey.topic && instanceConf.lane == apolloMQKey.lane &&
			instanceConf.role == apolloMQKey.role {
			xlog.Infof(ctx, "%s update instance:%s", fun, instanceKey)
			// 先构造新实例，装入新实例，关闭旧实例
			in, err := p.newInstance(ctx, instanceConf)
			if err != nil {
				xlog.Errorf(ctx, "%s new instance, conf: %s, err %v", fun, instanceConf.String(), err)
				return
			}
			p.instances.Store(instanceKey, in)
			if err = p.closeInstance(ctx, value, instanceConf); err != nil {
				xlog.Errorf(ctx, "%s close instance err:%v", fun, err)
			}
		}

		return
	})
}

func (p *Manager) Close() {
	fun := "Manager.Close -->"

	ctx := context.TODO()

	p.instances.Range(func(key, value interface{}) bool {
		xlog.Infof(ctx, "%s key:%v", fun, key)

		skey, ok := key.(string)
		if ok == false {
			return false
		}

		conf, err := p.confFromKey(skey)
		if err != nil {
			xlog.Errorf(ctx, "%s key:%v, err:%s", fun, key, err)
			return false
		}

		err = p.closeInstance(ctx, value, conf)
		if err != nil {
			xlog.Errorf(ctx, "%s close instance err:%v", fun, err)
		}

		p.instances.Delete(key)
		return true
	})
}

func (p *Manager) buildKey(conf *instanceConf) string {
	return conf.String()
}

func (p *Manager) confFromKey(key string) (*instanceConf, error) {
	return instanceConfFromString(key)
}

func (p *Manager) closeInstance(ctx context.Context, instance interface{}, conf *instanceConf) error {
	fun := "Manager.closeInstance-->"
	if conf.role == consumerRoleType {
		consumer, ok := instance.(*Consumer)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be consumer", fun, instance)
		}
		return consumer.Close()
	}

	if conf.role == producerRoleType {
		producer, ok := instance.(*Producer)
		if !ok {
			return fmt.Errorf("%s instance:%#v should be producer", fun, instance)
		}
		return producer.Close()
	}

	return nil
}

type instanceConf struct {
	group string
	role  roleType
	topic string
	lane  string
}

func (c *instanceConf) String() string {
	return fmt.Sprintf("%s@%s@%s@%s",
		c.group, c.role, c.topic, c.lane)
}

func instanceConfFromString(s string) (conf *instanceConf, err error) {
	items := strings.Split(s, instanceConfSep)
	length := len(items)
	if length < 4 {
		return nil, fmt.Errorf("invalid instance conf string:%s", s)
	}

	// group@role@topic@groupID

	conf = &instanceConf{
		group: items[0],
		topic: items[2],
		lane:  items[3],
	}

	conf.role, err = mqRoleTypeFromString(items[1])
	if err != nil {
		return nil, err
	}
	return
}

func mqRoleTypeFromString(it string) (t roleType, err error) {
	switch it {
	case "producer":
		t = producerRoleType
	case "consumer":
		t = consumerRoleType
	default:
		err = InvalidMqRoleTypeStringErr
	}
	return
}

func (p *Manager) getSleepTime(retryTimes int) (int64, error) {
	fun := "getSleepTime -->"
	if retryTimes > 10 {
		return 0, fmt.Errorf("%s retry times over", fun)
	}
	sleepTime := retryTimes + retryTimes/2

	if sleepTime < retryTimes {
		sleepTime = retryTimes
	}

	return int64(sleepTime), nil
}
